iT邦幫忙

2025 iThome 鐵人賽

DAY 26
0

前面我們建立了基礎的對話系統。這篇要用 GCP 既有的 AI 服務 打造 RAG 檢索系統

目標:利用 GCP 託管服務,用最少的程式碼實現 RAG 功能

1) GCP RAG 服務生態圖

flowchart TB
    subgraph "用戶層"
        USER[用戶查詢] --> CHAT[chat-service]
    end

    subgraph "GCP 託管 AI 服務"
        AGENT[Vertex AI Agent Builder<br/>統一 RAG 入口]
        SEARCH[Vertex AI Search<br/>企業搜尋引擎]
        CONV[Vertex AI Conversation<br/>對話管理]
        DOC_AI[Document AI<br/>文件解析]
        DISCOVERY[Discovery Engine<br/>知識發現]
    end

    subgraph "資料存儲"
        GCS[Cloud Storage<br/>原始文件]
        BQ[BigQuery<br/>結構化資料 + 向量]
        FIRESTORE[Firestore<br/>元資料]
    end

    subgraph "處理流程"
        WORKFLOW[Workflows<br/>文件處理流程]
        FUNC[Cloud Functions<br/>觸發器]
    end

    USER --> AGENT
    AGENT --> SEARCH
    AGENT --> CONV
    CHAT --> AGENT

    GCS --> DOC_AI
    DOC_AI --> SEARCH
    SEARCH --> BQ

    FUNC --> WORKFLOW
    WORKFLOW --> DOC_AI
    WORKFLOW --> SEARCH

核心架構理念:服務編排,而非重新實作

層級 GCP 服務 我們的角色
文件解析 Document AI 配置 + 觸發
向量檢索 Vertex AI Search 配置 + 查詢
對話管理 Agent Builder 配置 + 整合
工作流程 Workflows YAML 配置
總計 80% 託管服務 20% 膠水程式碼

2) Vertex AI Agent Builder:一站式 RAG 解決方案

什麼是 Agent Builder

Vertex AI Agent Builder 是 Google 的企業級對話 AI 平台,內建:

  • 文件索引和檢索
  • 自然語言理解
  • 對話狀態管理
  • 多輪對話支援
  • 與 Gemini 模型整合

建立 Agent Builder 應用

#!/bin/bash
# scripts/setup-agent-builder.sh

PROJECT_ID="your-project-id"
LOCATION="global"  # Agent Builder 目前只支援 global
APP_ID="ai-assistant-rag"

echo "🤖 建立 Vertex AI Agent Builder 應用..."

# 1. 啟用 API
gcloud services enable discoveryengine.googleapis.com
gcloud services enable aiplatform.googleapis.com

# 2. 建立 Search App(搜尋應用)
gcloud alpha discovery-engine search-applications create $APP_ID \
    --location=$LOCATION \
    --display-name="AI Assistant RAG" \
    --solution-type=SOLUTION_TYPE_SEARCH \
    --industry-vertical=GENERIC

# 3. 建立 Data Store(資料存儲)
gcloud alpha discovery-engine data-stores create "${APP_ID}-datastore" \
    --location=$LOCATION \
    --display-name="AI Assistant Documents" \
    --content-config=CONTENT_REQUIRED \
    --solution-type=SOLUTION_TYPE_SEARCH \
    --industry-vertical=GENERIC

# 4. 建立 Conversation App(對話應用)
gcloud alpha discovery-engine chat-applications create "${APP_ID}-chat" \
    --location=$LOCATION \
    --display-name="AI Assistant Chat" \
    --search-application="projects/$PROJECT_ID/locations/$LOCATION/searchApplications/$APP_ID"

echo "✅ Agent Builder 應用建立完成"
echo "📝 應用 ID: $APP_ID"
echo "🔗 管理控制台: https://console.cloud.google.com/ai/discovery-engine"

Agent Builder 配置檔

# config/agent-builder-config.yaml
# 對話應用設定

displayName: "AI Assistant RAG"
description: "智慧文件問答助手"

# 搜尋配置
searchConfig:
  searchTier: SEARCH_TIER_ENTERPRISE  # 企業級搜尋
  extractiveContent: true             # 提取式內容
  maxExtractiveAnswerCount: 3         # 最多3個答案片段
  maxExtractiveSegmentCount: 5        # 最多5個內容段落

# 對話配置
conversationConfig:
  agentCreation:
    business: "企業知識助手"
    description: "我是專業的企業知識助手,可以幫您查詢文件、政策、流程等資訊"

  # 自訂指令
  agentInstructions: |
    你是一個專業的企業知識助手。請遵循以下指導原則:

    1. 基於檢索到的文件內容回答問題
    2. 如果資訊不足,請誠實說明
    3. 總是提供資訊來源
    4. 用友善、專業的語調回應
    5. 支援中文和英文查詢

# 安全與過濾
safetySettings:
  - category: HARM_CATEGORY_HATE_SPEECH
    threshold: BLOCK_MEDIUM_AND_ABOVE
  - category: HARM_CATEGORY_DANGEROUS_CONTENT
    threshold: BLOCK_MEDIUM_AND_ABOVE

3) 文件處理自動化:Workflows + Document AI

文件處理工作流程

# workflows/document-processing.yaml
main:
  params: [event]
  steps:
    - init:
        assign:
          - project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
          - location: "us-central1"
          - processor_id: "your-document-ai-processor-id"
          - bucket_name: ${event.bucket}
          - file_name: ${event.name}

    - check_file_type:
        switch:
          - condition: ${text.match_regex(file_name, ".*\\.(pdf|docx|txt)$")}
            next: process_document
        next: end

    - process_document:
        call: process_with_document_ai
        args:
          project_id: ${project_id}
          location: ${location}
          processor_id: ${processor_id}
          gcs_uri: ${"gs://" + bucket_name + "/" + file_name}
        result: processed_content

    - extract_metadata:
        assign:
          - document_metadata:
              filename: ${file_name}
              processing_time: ${time.format(sys.now())}
              content_length: ${len(processed_content.text)}
              page_count: ${len(processed_content.pages)}

    - index_in_discovery_engine:
        call: http.post
        args:
          url: ${"https://discoveryengine.googleapis.com/v1/projects/" + project_id + "/locations/global/dataStores/ai-assistant-datastore/documents:import"}
          auth:
            type: OAuth2
          headers:
            Content-Type: "application/json"
          body:
            documents:
              - id: ${text.replace_all(file_name, "[^a-zA-Z0-9]", "_")}
                struct_data:
                  title: ${file_name}
                  content: ${processed_content.text}
                  metadata: ${document_metadata}
        result: import_result

    - log_completion:
        call: sys.log
        args:
          text: ${"Document processed successfully: " + file_name}
          severity: INFO

# Document AI 處理子流程
process_with_document_ai:
  params: [project_id, location, processor_id, gcs_uri]
  steps:
    - call_document_ai:
        call: http.post
        args:
          url: ${"https://documentai.googleapis.com/v1/projects/" + project_id + "/locations/" + location + "/processors/" + processor_id + ":process"}
          auth:
            type: OAuth2
          headers:
            Content-Type: "application/json"
          body:
            raw_document:
              gcs_document:
                gcs_uri: ${gcs_uri}
                mime_type: "application/pdf"
        result: api_response

    - extract_text:
        return:
          text: ${api_response.body.document.text}
          pages: ${api_response.body.document.pages}

觸發器設定

#!/bin/bash
# scripts/setup-document-triggers.sh

echo "🔧 設定文件處理觸發器..."

# 1. 部署 Workflow
gcloud workflows deploy document-processing \
    --source=workflows/document-processing.yaml \
    --location=asia-east1

# 2. 建立 Cloud Function 觸發器
cat > /tmp/trigger-function.py << 'EOF'
import functions_framework
from google.cloud import workflows_v1
import json

@functions_framework.cloud_event
def trigger_document_processing(cloud_event):
    # 當檔案上傳到 GCS 時觸發
    data = cloud_event.data

    # 只處理特定檔案類型
    if not data['name'].endswith(('.pdf', '.docx', '.txt')):
        return

    # 觸發 Workflow
    client = workflows_v1.ExecutionsClient()

    execution = {
        'argument': json.dumps({
            'bucket': data['bucket'],
            'name': data['name'],
            'time_created': data['timeCreated']
        })
    }

    parent = f"projects/{PROJECT_ID}/locations/asia-east1/workflows/document-processing"

    response = client.create_execution(
        parent=parent,
        execution=execution
    )

    print(f"Started workflow execution: {response.name}")
EOF

# 3. 部署 Cloud Function
gcloud functions deploy trigger-document-processing \
    --runtime=python310 \
    --trigger-event-filters="type=google.cloud.storage.object.v1.finalized" \
    --trigger-event-filters="bucket=your-documents-bucket" \
    --source=/tmp \
    --entry-point=trigger_document_processing \
    --region=asia-east1

echo "✅ 觸發器設定完成"

4) 簡化的 RAG 服務實作

主要服務類別

# services/rag/app/agent_builder_client.py
from google.cloud import discoveryengine_v1 as discoveryengine
from typing import Dict, Any, List, Optional
import logging

logger = logging.getLogger(__name__)

class AgentBuilderClient:
    """Vertex AI Agent Builder 客戶端"""

    def __init__(self, project_id: str, location: str = "global"):
        self.project_id = project_id
        self.location = location

        # 初始化客戶端
        self.search_client = discoveryengine.SearchServiceClient()
        self.conversation_client = discoveryengine.ConversationalSearchServiceClient()

        # 設定路徑
        self.search_app_path = f"projects/{project_id}/locations/{location}/searchApplications/ai-assistant-rag"
        self.conversation_app_path = f"projects/{project_id}/locations/{location}/conversationApps/ai-assistant-rag-chat"

    async def search_documents(
        self,
        query: str,
        page_size: int = 10,
        filters: Optional[str] = None
    ) -> Dict[str, Any]:
        """搜尋文件"""
        try:
            # 建立搜尋請求
            request = discoveryengine.SearchRequest(
                serving_config=f"{self.search_app_path}/servingConfigs/default_config",
                query=query,
                page_size=page_size,
                query_expansion_spec=discoveryengine.SearchRequest.QueryExpansionSpec(
                    condition=discoveryengine.SearchRequest.QueryExpansionSpec.Condition.AUTO
                ),
                spell_correction_spec=discoveryengine.SearchRequest.SpellCorrectionSpec(
                    mode=discoveryengine.SearchRequest.SpellCorrectionSpec.Mode.AUTO
                )
            )

            # 加入過濾器
            if filters:
                request.filter = filters

            # 執行搜尋
            response = self.search_client.search(request)

            # 處理結果
            results = []
            for result in response.results:
                document = result.document
                results.append({
                    "id": document.id,
                    "title": document.struct_data.get("title", ""),
                    "content": self._extract_content(document),
                    "metadata": dict(document.struct_data.get("metadata", {})),
                    "score": getattr(result, 'relevance_score', 0.0)
                })

            return {
                "query": query,
                "results": results,
                "total_results": len(results),
                "search_mode": "agent_builder"
            }

        except Exception as e:
            logger.error(f"Agent Builder 搜尋失敗: {e}")
            raise

    async def conversational_search(
        self,
        query: str,
        conversation_id: Optional[str] = None,
        user_pseudo_id: Optional[str] = None
    ) -> Dict[str, Any]:
        """對話式搜尋"""
        try:
            # 建立對話請求
            request = discoveryengine.ConverseConversationRequest(
                name=f"{self.conversation_app_path}/conversations/{conversation_id or '-'}",
                query=discoveryengine.TextInput(input=query),
                serving_config=f"{self.conversation_app_path}/servingConfigs/default_config"
            )

            if user_pseudo_id:
                request.user_pseudo_id = user_pseudo_id

            # 執行對話搜尋
            response = self.conversation_client.converse_conversation(request)

            # 處理回應
            return {
                "conversation_id": response.conversation.name.split('/')[-1],
                "reply": response.reply.summary.summary_text,
                "search_results": [
                    {
                        "title": result.title,
                        "content": result.document.struct_data.get("content", ""),
                        "uri": result.uri
                    }
                    for result in response.search_results
                ],
                "conversation_state": response.conversation.state.name
            }

        except Exception as e:
            logger.error(f"對話式搜尋失敗: {e}")
            raise

    def _extract_content(self, document) -> str:
        """提取文件內容"""
        if hasattr(document, 'struct_data') and 'content' in document.struct_data:
            return document.struct_data['content']
        elif hasattr(document, 'json_data'):
            return document.json_data
        else:
            return str(document)

class DocumentUploadService:
    """文件上傳服務"""

    def __init__(self, project_id: str, bucket_name: str):
        self.project_id = project_id
        self.bucket_name = bucket_name

        from google.cloud import storage
        self.storage_client = storage.Client()
        self.bucket = self.storage_client.bucket(bucket_name)

    async def create_upload_url(self, filename: str, user_id: str) -> Dict[str, str]:
        """建立預簽名上傳 URL"""
        try:
            # 生成唯一檔名
            import uuid
            unique_filename = f"{user_id}/{uuid.uuid4()}_{filename}"

            # 建立 blob
            blob = self.bucket.blob(unique_filename)

            # 生成預簽名 URL
            upload_url = blob.generate_signed_url(
                version="v4",
                expiration=3600,  # 1小時
                method="PUT",
                content_type="application/octet-stream"
            )

            return {
                "upload_url": upload_url,
                "file_path": unique_filename,
                "expires_in": 3600
            }

        except Exception as e:
            logger.error(f"建立上傳 URL 失敗: {e}")
            raise

    async def list_documents(self, user_id: str) -> List[Dict[str, Any]]:
        """列出用戶文件"""
        try:
            blobs = self.bucket.list_blobs(prefix=f"{user_id}/")

            documents = []
            for blob in blobs:
                documents.append({
                    "name": blob.name.split('/')[-1],
                    "size": blob.size,
                    "uploaded_at": blob.time_created.isoformat(),
                    "content_type": blob.content_type
                })

            return documents

        except Exception as e:
            logger.error(f"列出文件失敗: {e}")
            raise

簡化的主服務

# services/rag/app/main.py
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
import logging

from .agent_builder_client import AgentBuilderClient, DocumentUploadService
from .models import SearchRequest, SearchResponse
from .config import settings

logger = logging.getLogger(__name__)

@asynccontextmanager
async def lifespan(app: FastAPI):
    # 初始化服務
    app.state.agent_builder = AgentBuilderClient(
        project_id=settings.gcp_project_id,
        location="global"
    )

    app.state.upload_service = DocumentUploadService(
        project_id=settings.gcp_project_id,
        bucket_name=settings.documents_bucket
    )

    logger.info("✅ RAG Service (Agent Builder) 啟動完成")
    yield
    logger.info("🛑 RAG Service 關閉完成")

app = FastAPI(
    title="RAG Service (Agent Builder)",
    description="基於 Vertex AI Agent Builder 的智慧檢索服務",
    version="2.0.0",
    lifespan=lifespan
)

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# 依賴注入
def get_agent_builder() -> AgentBuilderClient:
    return app.state.agent_builder

def get_upload_service() -> DocumentUploadService:
    return app.state.upload_service

@app.get("/health")
async def health_check():
    """健康檢查"""
    return {
        "status": "healthy",
        "service": "rag-service-agent-builder",
        "version": "2.0.0"
    }

@app.post("/search", response_model=SearchResponse)
async def search_documents(
    request: SearchRequest,
    agent_builder: AgentBuilderClient = Depends(get_agent_builder)
):
    """文件檢索(使用 Agent Builder)"""
    try:
        # 建立過濾器
        filters = None
        if request.document_filters:
            filter_parts = []
            for key, value in request.document_filters.items():
                filter_parts.append(f'{key}: "{value}"')
            filters = " AND ".join(filter_parts)

        # 執行搜尋
        result = await agent_builder.search_documents(
            query=request.query,
            page_size=request.top_k,
            filters=filters
        )

        # 轉換為標準格式
        search_results = [
            {
                "chunk_id": r["id"],
                "document_id": r["id"],
                "content": r["content"][:1000],  # 限制長度
                "score": r["score"],
                "document_metadata": r["metadata"],
                "chunk_metadata": {}
            }
            for r in result["results"]
        ]

        return SearchResponse(
            query=request.query,
            results=search_results,
            total_results=len(search_results),
            search_time_ms=0.0,  # Agent Builder 不提供時間資訊
            search_mode="agent_builder"
        )

    except Exception as e:
        logger.error(f"搜尋失敗: {e}")
        raise HTTPException(status_code=500, detail="搜尋失敗")

@app.post("/chat")
async def conversational_search(
    query: str,
    conversation_id: str = None,
    user_id: str = None,
    agent_builder: AgentBuilderClient = Depends(get_agent_builder)
):
    """對話式搜尋"""
    try:
        result = await agent_builder.conversational_search(
            query=query,
            conversation_id=conversation_id,
            user_pseudo_id=user_id
        )

        return result

    except Exception as e:
        logger.error(f"對話式搜尋失敗: {e}")
        raise HTTPException(status_code=500, detail="對話式搜尋失敗")

@app.post("/documents/upload")
async def create_upload_url(
    filename: str,
    user_id: str,
    upload_service: DocumentUploadService = Depends(get_upload_service)
):
    """建立文件上傳 URL"""
    try:
        result = await upload_service.create_upload_url(filename, user_id)
        return result
    except Exception as e:
        logger.error(f"建立上傳 URL 失敗: {e}")
        raise HTTPException(status_code=500, detail="建立上傳 URL 失敗")

@app.get("/documents")
async def list_documents(
    user_id: str,
    upload_service: DocumentUploadService = Depends(get_upload_service)
):
    """列出用戶文件"""
    try:
        documents = await upload_service.list_documents(user_id)
        return {"documents": documents}
    except Exception as e:
        logger.error(f"列出文件失敗: {e}")
        raise HTTPException(status_code=500, detail="列出文件失敗")

if __name__ == "__main__":
    import uvicorn
    uvicorn.run("main:app", host="0.0.0.0", port=8080, reload=True)

5) BigQuery 向量檢索(進階選項)

設定 BigQuery Vector Search

-- BigQuery 向量搜尋設定
-- scripts/setup-bigquery-vectors.sql

-- 建立資料集
CREATE SCHEMA IF NOT EXISTS `ai_assistant.rag_data`;

-- 建立文件表
CREATE OR REPLACE TABLE `ai_assistant.rag_data.documents` (
  document_id STRING NOT NULL,
  filename STRING NOT NULL,
  content TEXT NOT NULL,
  embedding ARRAY<FLOAT64>,  -- 向量欄位
  metadata JSON,
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP(),
  updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP()
);

-- 建立向量搜尋函數
CREATE OR REPLACE FUNCTION `ai_assistant.rag_data.cosine_similarity`(
  vec1 ARRAY<FLOAT64>,
  vec2 ARRAY<FLOAT64>
) AS (
  -- 計算餘弦相似度
  (
    SELECT
      SAFE_DIVIDE(
        dot_product,
        SQRT(norm1 * norm2)
      )
    FROM (
      SELECT
        SUM(v1 * v2) as dot_product,
        SUM(v1 * v1) as norm1,
        SUM(v2 * v2) as norm2
      FROM UNNEST(vec1) AS v1 WITH OFFSET pos1
      JOIN UNNEST(vec2) AS v2 WITH OFFSET pos2
      ON pos1 = pos2
    )
  )
);

-- 向量搜尋預存程序
CREATE OR REPLACE PROCEDURE `ai_assistant.rag_data.vector_search`(
  query_embedding ARRAY<FLOAT64>,
  top_k INT64
)
BEGIN
  SELECT
    document_id,
    filename,
    content,
    metadata,
    `ai_assistant.rag_data.cosine_similarity`(embedding, query_embedding) as similarity_score
  FROM `ai_assistant.rag_data.documents`
  WHERE embedding IS NOT NULL
  ORDER BY similarity_score DESC
  LIMIT top_k;
END;

BigQuery 檢索客戶端

# services/rag/app/bigquery_client.py
from google.cloud import bigquery
from typing import List, Dict, Any
import logging

logger = logging.getLogger(__name__)

class BigQueryVectorClient:
    """BigQuery 向量檢索客戶端"""

    def __init__(self, project_id: str, dataset_id: str = "rag_data"):
        self.project_id = project_id
        self.dataset_id = dataset_id
        self.client = bigquery.Client(project=project_id)

    async def search_vectors(
        self,
        query_embedding: List[float],
        top_k: int = 10
    ) -> List[Dict[str, Any]]:
        """向量檢索"""
        try:
            # 建立查詢
            query = f"""
            CALL `{self.project_id}.{self.dataset_id}.vector_search`(
                {query_embedding},
                {top_k}
            );
            """

            # 執行查詢
            query_job = self.client.query(query)
            results = query_job.result()

            # 處理結果
            search_results = []
            for row in results:
                search_results.append({
                    "document_id": row.document_id,
                    "filename": row.filename,
                    "content": row.content,
                    "metadata": dict(row.metadata) if row.metadata else {},
                    "score": float(row.similarity_score)
                })

            return search_results

        except Exception as e:
            logger.error(f"BigQuery 向量檢索失敗: {e}")
            raise

    async def insert_document(
        self,
        document_id: str,
        filename: str,
        content: str,
        embedding: List[float],
        metadata: Dict[str, Any] = None
    ):
        """插入文件和向量"""
        try:
            table_ref = self.client.dataset(self.dataset_id).table("documents")
            table = self.client.get_table(table_ref)

            rows = [{
                "document_id": document_id,
                "filename": filename,
                "content": content,
                "embedding": embedding,
                "metadata": metadata or {}
            }]

            errors = self.client.insert_rows_json(table, rows)

            if errors:
                raise Exception(f"插入失敗: {errors}")

        except Exception as e:
            logger.error(f"插入文件失敗: {e}")
            raise

6) 部署配置(大幅簡化)

Docker 配置(超輕量)

# services/rag/Dockerfile
FROM python:3.11-slim

WORKDIR /app

# 只需要基本依賴
COPY services/rag/requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY services/rag/app ./app
COPY shared ./shared

ENV PYTHONPATH=/app
ENV PORT=8080

EXPOSE 8080

CMD ["python", "-m", "uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8080"]

超簡化的需求檔案

# services/rag/requirements.txt
fastapi==0.104.1
uvicorn[standard]==0.24.0
pydantic==2.5.0
pydantic-settings==2.1.0

# GCP 服務
google-cloud-discoveryengine==0.11.0
google-cloud-storage==2.10.0
google-cloud-bigquery==3.11.4

# 基本工具
aiofiles==23.2.1
python-multipart==0.0.6

一鍵部署腳本

#!/bin/bash
# scripts/deploy-agent-builder-rag.sh

PROJECT_ID="your-project-id"
REGION="asia-east1"

echo "🚀 部署 Agent Builder RAG 服務..."

# 1. 設定 Agent Builder
./scripts/setup-agent-builder.sh

# 2. 設定文件處理流程
./scripts/setup-document-triggers.sh

# 3. 部署 RAG 服務
gcloud run deploy rag-service-v2 \
    --image=gcr.io/$PROJECT_ID/rag-service:latest \
    --region=$REGION \
    --set-env-vars="GCP_PROJECT_ID=$PROJECT_ID,DOCUMENTS_BUCKET=${PROJECT_ID}-documents" \
    --service-account=rag-service-sa@$PROJECT_ID.iam.gserviceaccount.com \
    --min-instances=1 \
    --max-instances=10 \
    --memory=1Gi \
    --cpu=1 \
    --timeout=300s \
    --allow-unauthenticated

echo "✅ 部署完成!"
echo "📊 Agent Builder 控制台: https://console.cloud.google.com/ai/discovery-engine"
echo "🔗 RAG API: https://rag-service-v2-xxx.run.app"

7) 實際使用比較

傳統自建 vs Agent Builder

項目 傳統自建 RAG Agent Builder RAG
開發時間 2-4 週 2-4 天
程式碼量 2000+ 行 < 500 行
維護成本 高(自己維護向量DB、索引) 低(GCP 託管)
擴展性 需自己處理 自動擴展
準確率 取決於實作品質 Google 調優過的模型
多語言支援 需額外開發 內建支援
監控除錯 需自建 GCP 控制台

成本比較(月費用估算)

組件 自建成本 Agent Builder 成本
運算資源 Cloud Run: $50 Cloud Run: $20
向量資料庫 Vector Search: $200 包含在 Agent Builder
文件處理 Document AI: $100 包含在 Agent Builder
檢索 API 自建維護: $100/月 $0.002/查詢
總計 ~$450/月 ~$150/月(1萬查詢)

8) 與 chat-service 整合(更簡單)

# services/chat/app/handlers.py (Agent Builder 版本)
class ChatHandler:
    def __init__(self):
        # ... 原有代碼 ...

        # Agent Builder 客戶端
        from google.cloud import discoveryengine_v1 as discoveryengine
        self.conversation_client = discoveryengine.ConversationalSearchServiceClient()
        self.conversation_app_path = f"projects/{settings.gcp_project_id}/locations/global/conversationApps/ai-assistant-rag-chat"

    async def _agent_builder_response(self, message: str, user_id: str, conversation_id: str = None) -> str:
        """使用 Agent Builder 進行對話"""
        try:
            # 直接呼叫 Agent Builder 對話 API
            request = discoveryengine.ConverseConversationRequest(
                name=f"{self.conversation_app_path}/conversations/{conversation_id or '-'}",
                query=discoveryengine.TextInput(input=message),
                serving_config=f"{self.conversation_app_path}/servingConfigs/default_config",
                user_pseudo_id=user_id
            )

            response = self.conversation_client.converse_conversation(request)

            # Agent Builder 已經整合了檢索 + 生成
            return response.reply.summary.summary_text

        except Exception as e:
            logger.error(f"Agent Builder 對話失敗: {e}")
            # 降級到一般 Gemini
            return await self._call_gemini(message)



上一篇
實戰部署篇:從 localhost 到 GCP 生產環境
下一篇
Firebase + Pub/Sub 即時推播:GCP 即時對話
系列文
來都來了,那就做一個GCP從0到100的AI助理29
圖片
  熱門推薦
圖片
{{ item.channelVendor }} | {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言